package com.go.flink_code;

import com.go.pojo.DataReading;
import com.go.udfs.JsonToPojoFromKafka;
import com.go.utils.FlinkUtils;
import com.go.utils.MyKafkaDeserializationSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * ClassName: demo3
 * Description:
 * Date: 2022/1/6
 * @author: Cason
 */
public class demo3 {
   public static void main(String[] args) throws Exception {
      StreamExecutionEnvironment environment = FlinkUtils.environment;
      DataStream<Tuple2<String, String>> kafkaStream = FlinkUtils.createKafkaStream(args[0], MyKafkaDeserializationSchema.class);

      SingleOutputStreamOperator<DataReading> processed = kafkaStream.process(new JsonToPojoFromKafka());
      SingleOutputStreamOperator<String> map = processed.map(DataReading::getDeviceId);
      map.print();

      environment.execute("firstWorldCount");
   }
}
